feat(go-forwarder): add SQS as failed event storage#1152
Conversation
|
e4b4d3a to
002e0bb
Compare
| ) | ||
|
|
||
| const ( | ||
| maxSizePerSQSMessage = 1000 * 1024 // Overhead for other attributes than message body |
There was a problem hiding this comment.
The maximum SQS message size is 1MiB.
However, we have to take all the attributes of sqs.SendMessageInput into account, so we're forced to keep an overhead.
| return nil | ||
| } | ||
|
|
||
| func (b *Batcher) StartYield(items []json.RawMessage) iter.Seq2[json.RawMessage, error] { |
There was a problem hiding this comment.
discussed IRL let's try the two channel approach
| return Batch{ | ||
| Data: batch, | ||
| StorageTag: out.Metadata[metadataStorageTagKey], | ||
| DeleteKey: aws.ToString(object.Key), |
There was a problem hiding this comment.
isn't the key known to the getter ? why are we returning it with the batch ?
There was a problem hiding this comment.
We would have to return either return (body io.Reader, storageTag string, err error) or manually add the storedBatch.DeleteKey=... inside the loop.
Since we are forced to encapsulate the GetObject call into a function (since we have to defer.Close() the out.Body), I found it more simpler to encapsulate the whole batching logic inside getBatch instead to additionally set the DeleteKey inside the loop.
ff37812 to
7855ace
Compare
| func NewConfig(maxItemSize, maxBatchSize, maxItemsPerBatch int) Config { | ||
| return Config{ | ||
| maxItemSize: maxItemSize, | ||
| maxBatchSize: maxBatchSize, | ||
| maxItemsPerBatch: maxItemsPerBatch, |
There was a problem hiding this comment.
I had three solution to reuse the Batcher.Start() method :
- Keep the old functional option pattern and change the
inchannel frommodel.LogEntrytoany. Too much restrictive: we would lose type safety and change theHandlersignature. - Keep the old functional option pattern, make the Batcher and the Option both generic. Too much verbose, e.g.
batching.WithMaxItemSize[model.LogEntry](1*1024*1024)three times and it does not provide any value since only theStartmethod leverage genericity. - Delete the old functional option pattern, use a config struct directly and make the batcher generic.
Opted for 3.
ge0Aja
left a comment
There was a problem hiding this comment.
lgtm i trust you on the comments
share AWS config between sdk clients misc feat(go-forwarder): add SQS as failed event storage delete ChangeMessageVisibilityBatch API call Add functional option pattern to batching and use batching inside SQS Delete old Store implementation + change message on debug
80bf865 to
c9592eb
Compare
This PR adds SQS as a failed event storage.
A refactor was needed on
storing.Storage, which previously was not abstract enough to support diverse storage services. A newstoring.Batchstruct has been created to represent failed batches from the forwarder.Batch.DeleteHandleis a unique identifier of the underlying stored batch mechanism (key for S3, receiptHandler for SQS), used to delete the batch if the retry succeeds.Diverging behaviors from the Python behavior are :
sqs.SendMessageBatch, a batch equivalent ofsqs.SendMessageto minimize API calls onStore().retry_prefixwhich would equals to either logs, metrics or traces, and stored as a message attribute is deleted since we know only deals with logs.function_prefixwhich would equal to the sha1 of the forwarder's arn and stored as a message attribute is deleted. It was used to differentiate between message sent from the forwarder, from other the consumers. Sharing the same SQS queue between unrelated use-cases is a bad design and would highly enhance API costs by receiving noisy messages.Side notes :
DD_STORE_FAILED_EVENTS,DD_S3_BUCKET_NAMEandDD_SQS_QUEUE_URLwhich wraps the same logic around the retry mechanism: storing failed event. Imo, we should only keep one of these (since is one or the other), or a newDD_FAILED_EVENT_STORAGEwhich would directly take either an S3 bucket name or an SQS queue URL, if such identification is feasible.